Conversation
There was a problem hiding this comment.
Pull request overview
This PR refactors the reducer architecture to reduce memory footprint by changing reducers from iterable stream wrappers to simple stateful aggregators. The key architectural change is that reducers no longer extend AbstractStream but implement a simpler ReducerInterface with a next() method for value-by-value processing.
Key Changes:
- Refactored all reducers (Sum, Min, Max, Count, Average, Callback) to use a stateful
next()method instead of wrapping collections as iterables - Introduced a new
Reduceoperator that wraps reducers and provides iteration capabilities - Changed aggregator access pattern from
$stream->aggregators['name']->valueto$stream->aggregated['name']for a cleaner API
Reviewed changes
Copilot reviewed 41 out of 41 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| src/RunOpenCode/Component/Dataset/src/Contract/ReducerInterface.php | Updated interface to remove iterable requirement and add next() method for stateful reduction |
| src/RunOpenCode/Component/Dataset/src/Contract/StreamInterface.php | Added $aggregated property to expose aggregated values directly |
| src/RunOpenCode/Component/Dataset/src/Contract/CollectorInterface.php | Renamed $aggregators to $aggregated for consistency |
| src/RunOpenCode/Component/Dataset/src/Reducer/*.php | Refactored all reducers (Sum, Min, Max, Count, Average, Callback) to implement stateful reduction via next() method |
| src/RunOpenCode/Component/Dataset/src/Operator/Reduce.php | New operator that wraps reducers and provides iteration capabilities |
| src/RunOpenCode/Component/Dataset/src/Operator/*.php | Updated type alias naming for consistency (added "Callable" suffix) |
| src/RunOpenCode/Component/Dataset/src/Stream.php | Made class final and updated callable parameter documentation to support optional key parameter |
| src/RunOpenCode/Component/Dataset/src/Collector/*.php | Updated to use new $aggregated property instead of $aggregators |
| src/RunOpenCode/Component/Dataset/src/Aggregator/Aggregator.php | Modified to wrap the new Reduce operator instead of raw reducers |
| src/RunOpenCode/Component/Dataset/src/AbstractStream.php | Added $aggregated computed property that maps aggregators to their values |
| src/RunOpenCode/Component/Dataset/src/functions.php | Updated reduce() and aggregate() functions to work with new reducer architecture |
| tests/**/*.php | Updated all tests to use new reducer API via reduce() helper function |
| docs/**/*.rst | Updated documentation to reflect new API and fix spelling errors |
Comments suppressed due to low confidence (1)
src/RunOpenCode/Component/Dataset/src/Collector/CursoredCollector.php:64
- The check for
$this->closedwas removed from thepreviousproperty getter, but it should remain. Without this check, callers can accesspreviousbefore iteration completes, which may return incorrect results since the offset calculation depends on iteration state.
if ($this->offset <= 0) {
return null;
}
return null === $this->limit ? 0 : \max(0, $this->offset - $this->limit);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Line 44 adds new column to the each row, ``converted`` which will convert all | ||
| amounts to ``EUR`` using the provided conversion rate. |
There was a problem hiding this comment.
The phrase "adds new column to the each row" should be "adds a new column to each row" (remove "the" before "each").
| /** | ||
| * Interface for dataset reducers. | ||
| * | ||
| * Each reducer is simple, stateful class instance which does |
There was a problem hiding this comment.
Should be "a simple" instead of just "simple" for grammatical correctness.
| * @extends AbstractStream<TKey, TValue> | ||
| */ | ||
| class Stream extends AbstractStream | ||
| final class Stream extends AbstractStream |
There was a problem hiding this comment.
Making the Stream class final is a breaking change that prevents users from extending it to add custom operators. The documentation comment above this line (line 35, which is outside the diff) states "You may extend this class to add your own custom operators," creating an inconsistency. Consider whether this breaking change is intentional, and if so, the documentation should be updated accordingly.
| throw new LogicException('Collector must be iterated first.'); | ||
| } | ||
|
|
||
| return $this->aggregated; |
There was a problem hiding this comment.
This getter references $this->aggregated recursively without a backing property, which will cause infinite recursion and a fatal error. The getter should return $this->collection instanceof StreamInterface ? $this->collection->aggregated : [] instead.
| Aggregators are a concept introduced by this library. The general idea is that | ||
| you can iterate over a stream with applied operators while simultaneously | ||
| calculating a reduced value in single pass. |
There was a problem hiding this comment.
The phrase "in single pass" would be clearer as "in a single pass".
| } | ||
|
|
||
| /** | ||
| * Denotes of collection has been fully iterated. |
There was a problem hiding this comment.
Should be "Denotes if" instead of "Denotes of".
No description provided.